package cn.gupao.udfs;

import cn.gupao.service.QueryService;
import cn.gupao.utils.RuleSimulator;
import cn.gupao.pojo.*;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class RulesMatchFunctionV2 extends KeyedProcessFunction<String, LogBean, MatchResult> {

    public RulesMatchFunctionV2() {
    }

    private QueryService queryService;

    private RuleCondition ruleCondition;

    private transient ListState<EventStateBean> eventsState;

    private transient MapState<Long, TimerCondition> timerConditionMapState;


    @Override
    public void open(Configuration parameters) throws Exception {
        //获取客户端发送的全局参数(通过运行时上下文，获取客户端发送的全局参数)
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

        //定义状态描述器
        ListStateDescriptor<EventStateBean> eventStateDescriptor = new ListStateDescriptor<>("event-state", EventStateBean.class);
        //设置TTL
        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(2)).build();
        eventStateDescriptor.enableTimeToLive(stateTtlConfig);
        //初始化或恢复状态
        eventsState = getRuntimeContext().getListState(eventStateDescriptor);

        //定义一个状态描述器，用来保存时间触发条件
        MapStateDescriptor<Long, TimerCondition> timerConditionStateDescriptor = new MapStateDescriptor<>("timer-condition-state", Long.class, TimerCondition.class);
        timerConditionMapState = getRuntimeContext().getMapState(timerConditionStateDescriptor);


        queryService = new QueryService();
        //将参数传入到init方法中
        queryService.init(parameterTool, eventsState);
        //获取事先定义好的规则(以后可以从广播状态中获取）
        ruleCondition = RuleSimulator.getRule();

    }

    @Override
    public void processElement(LogBean bean, Context ctx, Collector<MatchResult> out) throws Exception {

        //将事件数据添加到状态中
        eventsState.add(bean.toEventStateBean());

        //匹配规则
        boolean flag = queryService.isMatch(bean, ruleCondition);

        //匹配上结果
        if (flag) {

            //判断是否是基于时间定时器触发的规则

            if(!ruleCondition.isHasTimer()) {
                //没有配置基于时间触发的定时器（正常输出匹配的结果）
                //输出匹配结果
                out.collect(new MatchResult(bean.getDeviceId(), bean.getTimeStamp(), System.currentTimeMillis(), ruleCondition.getId()));

            } else {
                //有事先配置了基于时间触发的规则

                TimerCondition timerCondition = ruleCondition.getTimerCondition();
                Long lateTime = timerCondition.getLateTime();
                //获取用户当前事件的时间
                long eventTime = bean.getTimeStamp();
                //定时器触发的时间
                long triggerTime = eventTime + lateTime;
                //注册定时器
                ctx.timerService().registerEventTimeTimer(triggerTime);
                //未来前触发的规则条件保存到状态中
                timerConditionMapState.put(triggerTime, timerCondition);
            }
        }


    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<MatchResult> out) throws Exception {

        //获取事先注册好的规则，查询状态和数据库（Redis+DorisDB）
        //根据触发的时间，即onTimer方法传入的timestamp，到状态中查找相应的规则
        TimerCondition timerCondition = timerConditionMapState.get(timestamp);
        if (timerCondition != null) {

            CombineCondition combineCondition = timerCondition.getCombineCondition();
            //根据定义的时间触发规则查询（先状态 -> redis -> dorisDB）
            //long startTime =  timestamp - timerCondition.getLateTime()

            //现在规定是指定的时间作为开始时间，定时器触发的时间为结束时间
            String deviceId = ctx.getCurrentKey();
            boolean isMatch = queryService.isMatchTimerCondition(deviceId, timestamp, combineCondition);
            if (isMatch) {
                out.collect(new MatchResult(deviceId, timestamp, System.currentTimeMillis(), timerCondition.getTimerRuleId()));
            }
            //移除规则
            timerConditionMapState.remove(timestamp);
        }

    }

    @Override
    public void close() throws Exception {
        queryService.destroy();
    }
}
